In practice, concurrency means doing multiple tasks at the same time. This doesn't necessarily guarantee any sort of speed-up or performance gain! For instance, on a single CPU, concurrency is technically still possible, but is only achieved by the processor switching back and forth between instruction sets. This does not result in a performance gain (in fact, it often results in a loss of performance due to the overhead associated with any checks that have to be performed every time the instruction set is switched).
The above example of concurrency on a single core is sort of a fringe case; in general we're interested in systems with multiple cores, i.e. the ability to run things in parallel. Some typical use cases include...
So what does any of this have to do with Python? There are many cases where concurrency with python are very useful; the GUI and streaming data cases for example. The most common application for scientists and engineers is wanting to speed up the execution of some code by parallelizing, taking advantage of all (or at least more) of the computing power of multi-core systems.
Python has tools that allow the programmer to take full advantage of their computing system. The level of performance gain that you can get from concurrency for typical scientific python scenarios (e.g. simulating or crunching data on a personal computer with [2,16] cores) depends on the type of problem and the number of cores.
You want to make sure you're using the right tool for the job. If you are looking for major performance gains (10x - 10000x), concurrent programming in python will not get you there. For these cases you're better off
A good rule-of-thumb regarding when to go about parallelizing your python code is the "cup-of-tea" threshold. If your code currently runs on the order of 5-10 minutes (i.e. long enough for you to get up and focus on something else like making a cup of tea) but you're not ready to invest a ton of time in writing the code in a low-level language (premature optimization is the root of all evil - Knuth), then parallelizing your current code is a good approach, especially if it's naively parallel!
This demo will focus on two tools within the python standard library - the threading module, and the multiprocessing module. We'll cover when to use one module vs. the other, and give some examples to demonstrate how they work.
There are many other tools that can be used to implement concurrent programs; most notably tools associated with distributed computing, i.e. message passing to computers connected via networking. These tools, such as sockets, 0MQ (pyzmq), or xmlrpc can be used to set up concurrent jobs on either multiple PC's or even on a single PC (using localhost as the network). In order to keep this talk a reasonable length, we won't be discussing these tools here; however this doesn't mean that they aren't incredibly useful!
In [ ]:
import threading
There are several ways to spawn a thread...
In [ ]:
# Subclass the threading.Thread object and overwrite it's run() method with your code
class ExampleCountingThread(threading.Thread):
def __init__(self, countdown=10000, modulo=1000):
# If you are going to have your own constructor, make sure you call the parent constructor too!
# Thread.__init__(self)
super(ExampleCountingThread, self).__init__()
self.value = countdown
self.print_interval = modulo
def run(self):
# A simple example that counts down from 10000 and prints the value when the modulus with the print interval is 0
while self.value > 0:
if self.value % self.print_interval == 0:
print self.value
self.value -= 1
print '%s done processing' %self.name
In [ ]:
# Run the thread
t1 = ExampleCountingThread(100000, 10000)
# When you call thread.start(), the run() method is invoked
t1.start()
# Wait for thread t1 to exit - you may get funky behavior
# if you don't join your threads. Join can only be called
# from another thread (i.e. a thread can't join itself)
t1.join()
In [ ]:
# Or, we can pass the function we want to run in as a target function
def countdown(value=10000, modulo=1000, pf=True):
'''This is the same function as implemented in the run() method above'''
while value > 0:
if value % modulo == 0:
print value
value -= 1
if pf: print 'Done processing!'
return
In [ ]:
# Run the thread
t2 = threading.Thread(target=countdown, args=(100000,10000))
t2.start()
t2.join()
If you have at least 2 processors, you'd expect 2 threads running the same function concurrently to run in about the same amount of time it would take one processor to run the function (although a bit slower because of Amdahl's law), albeit producing twice the amount of data. Let's try it:
In [ ]:
%%timeit
# Sequential example: Count down from 100000 twice in a row. Note: suppress in-loop printing with larger modulo
countdown(10000000, 1e8, pf=False) # First run
countdown(10000000, 1e8, pf=False) # Second run
In [ ]:
%%timeit
# Now, let's try concurrency with threads. We expect two threads running the countdown function at once to run in about 50%
# of the time it took the sequential version to run
t1 = threading.Thread(target=countdown, args=(10000000, 1e8, False))
t2 = threading.Thread(target=countdown, args=(10000000, 1e8, False))
t1.start(); t2.start()
t1.join(); t2.join()
How is it that the concurrent example took LONGER than the sequential one? Those who are parallel-saavy might say that maybe the overhead for creating a thread in python is massive. A threading library that had a thread overhead that was THAT massive would be pretty useless (on my home desktop, the timeit results would imply it takes 3 seconds to spawn each thread).
That's not actually what's going on however; which brings us to the big problem with the threading module:
In [ ]:
from IPython.display import YouTubeVideo
YouTubeVideo('a1Y73sPHKxw', autoplay=True)
The Global Interpreter Lock, or GIL for short, is a construct implemented within the python interpreter itself that ensures that any piece of code that is currently running has full and exclusive access of the machinery of the interpreter. This means that two threads (which are indeed true threads, either posix or windows implemented depending on your system) can't run concurrently within a single interpreter. I don't want to go into any details, because one can fall down the rabbit-hole very rapidly here. Suffice it to say that the GIL prevents multiple pieces of python code from running at the same time within the same interpreter. This has nothing to do with hardware (i.e. you could have 7 idle cores just waiting to be used; the GIL will not let you use them), but is a limitation built into C/Python itself.
Rather than talk about the merits/nitty-gritty of the GIL, let's instead move on to a solution that works around it: the multiprocessing module
The GIL limitation might make it seem that the threading module is entirely useless for concurrent applications. This is not entirely so. The GIL really only causes problems for cpu-bound applications. The GIL has a quirk that it is actually released during blocking I/O.
In [ ]:
from IPython.display import Image
embed = Image('http://image.slidesharecdn.com/introconcurrent-100925150728-phpapp02/95/an-introduction-to-python-concurrency-85-728.jpg')
embed
This quirk means that despite the GIL, the threading module is still useful for I/O bound applications. One such application would be a server, where you might have hundreds or thousands of threads waiting for input. As long as the server doesn't do anything computationally intensive when it gets a request, it will do a fine job handling tons of concurrent requests.
Another note: the Python interpreter does not handle thread scheduling at all: this is left up to the host operating system. Thus there is no additional performance degradation from Python when it comes to thread handling; in other words, in I/O bound applications where the GIL doesn't interfere as much, the thread accessing and scheduling shouldn't be any worse than it would be if you implemented the same application using C and pthreads for example.
Recall that the GIL only affects code running within a single interpreter. To get around this limitation, the multiprocessing module allows you to spawn "threads" in separate processes. In other words, each "thread" or "process" you spawn is actually a separate instance of a python interpreter!
In [ ]:
from multiprocessing import Process
In [ ]:
# Look at the system monitor to see all of the individual processes
import time
def wait_then_print(num, wait=2.0):
'''Sleep for "wait" seconds then print the given number'''
time.sleep(wait)
print 'Process %s done waiting!' %num
return
# Create a list of processes
processes = []
for i in range(4): processes.append(Process(target=wait_then_print, args=(i, 5.0)))
# Start each of the processes
for p in processes: p.start()
# Join the processes
for p in processes: p.join()
In [ ]:
%%timeit
# Sequential example: Count down from 100000 twice in a row. Note: suppress in-loop printing with larger modulo
countdown(10000000, 1e8, pf=False) # First run
countdown(10000000, 1e8, pf=False) # Second run
In [ ]:
%%timeit
# Now, let's try concurrency with threads. We expect two threads running the countdown function at once to run in about 50%
# of the time it took the sequential version to run
p1 = Process(target=countdown, args=(10000000, 1e8, False))
p2 = Process(target=countdown, args=(10000000, 1e8, False))
p1.start(); p2.start()
p1.join(); p2.join()
Another thing you might notice from the above example is that the syntax for using multiprocessing and threading are nearly identical. This is great news! You don't need to learn a whole new syntax to use the module - if you had been practicing with threading and not getting results, you can just as easily try multiprocessing by switching out thread with Process (and maybe a few other minor tweaks)
Since each process that is spawned is running in it's own Python interpreter, the shared-memory model that we had for the threading module doesn't apply here (since the different processes don't have access to the same data). Thus the multiprocessing module uses a distributed memory model, where information is exchanged between threads via message passing.
The two main tools for passing data between threads are pipes and queues
Note: It is also possible to set up shared-memory that can be accessed by multiple processes; however, the multiprocessing documentation recommends against this if at all possible; I personally never do it.
Let's do some concurrent image processing on single image. In this example, we want to apply a edge detection filter (the canny filter) to the image with different degrees of gaussian blurring
In [ ]:
%pylab
import time
In [ ]:
# First, import an image to process
from skimage import data
from skimage.filter import canny
img = data.camera()
imshow(img, cmap=cm.Greys_r)
In [ ]:
from multiprocessing import Process, Queue
# Let's use the object-oriented paradigm for setting up the processes
class FilterProcess(Process):
def __init__(self, inq, outq):
'''Each process must be given an input queue, an output queue, and a sigma for the filter'''
Process.__init__(self) # Make sure to call the parent constructor!!!!
self.input_queue = inq
self.output_queue = outq
def run(self):
# Keep checking the input queue for images until it gets a special "STOP" key
for image, sigma in iter(self.input_queue.get, "STOP"):
# Apply filter
edges = canny(image, sigma)
# Put
self.output_queue.put((edges, sigma))
In [ ]:
# Now let's set up a group of processes to apply the filters. Let's look at 4 different filter levels
# Setup the queues for passing data to and from the FilterProcesses
to_workers = Queue()
from_workers = Queue()
# Choose how many workers you want to set up
num_processes = 2
# Make a list of processes
processes = []
for i in range(num_processes): processes.append(FilterProcess(to_workers, from_workers))
# Start the processes
for p in processes: p.start()
# Now the processes are waiting for us to send them an image on the to_workers queue
In [ ]:
# Set up the filter sigmas and the output container
filter_widths = arange(0,10)
edges = np.zeros((img.shape[0], img.shape[1], len(filter_widths)))
# Some utiltiy functions to help us stop the processes when we're done, and plot the filtered results
def stop_processes(process_list):
for i in range(num_processes): to_workers.put("STOP")
for p in process_list: p.join()
def show_edges(edges):
fig, ax = subplots(2,5)
for i, a in enumerate(ax.ravel()):
a.imshow(edges[:,:,i], cmap=cm.Greys_r)
Now we can compare the results of running the edge filter sequentially or in parallel
In [ ]:
# Single process
tic = time.time()
for i,sigma in enumerate(filter_widths):
edges[:,:,i] = canny(img, sigma)
toc = time.time()
print "%.5f" %(toc-tic)
In [ ]:
show_edges(edges)
edges = np.zeros((img.shape[0], img.shape[1], len(filter_widths)))
In [ ]:
# Concurrent Processes
tic = time.time()
for i, sigma in enumerate(filter_widths):
to_workers.put((img, sigma))
for i in range(10):
edges[:,:,i] = from_workers.get()[0]
toc = time.time()
print "%.5f" %(toc-tic)
In [ ]:
show_edges(edges)
Note that the processes are still waiting for new input on the queue (since we haven't given them the stop signal yet) so we can analyze new images if we wanted to
In [ ]:
img2 = data.moon()
edges = np.zeros((img2.shape[0], img2.shape[1], len(filter_widths)))
# Analyze new image
tic = time.time()
for i, sigma in enumerate(filter_widths):
to_workers.put((img2, sigma))
for i in range(10):
edges[:,:,i] = from_workers.get()[0]
toc = time.time()
print "%.5f" %(toc-tic)
show_edges(edges)
In [ ]:
stop_processes(processes)
There are many features in the multiprocessing library that I'm not covering, but one feature that I do think warrants explicit mention (I use it regularly) is Pool.
multiprocessing.Pool is a simple, high-level interface to mapping work out to many worker threads, then collecting the results. This is very useful for speeding up problems that are "naively parallel".
Let's go back to last week for an example. Say we want to do a monte-carlo simulation of particle diffusion, and we've already been smart about vectorizing our problem, but it still runs too slow...
In [ ]:
import sys
sys.path.append('..')
In [ ]:
from numpyVectorization.motion_gauss import simulateParticles_loop
In [ ]:
%%timeit
# Serial example
num_particles = 100000
num_steps = 100
steps, trajectories = simulateParticles_loop(num_particles, num_steps, showPlots=False) # Disable graphics
Pool allows us to map the simulation out (the trajectories don't depend on each other, so the problem is "naively parallel") to as many processes as we'd like.
Before we dive into the parallel example, let's look at the basics of Pool
In [ ]:
from multiprocessing import Pool, cpu_count # A helper function that determines how many cores you have
# Find out how many cpus you have
num_cpus = cpu_count()
print 'My computer has %s processors' %(num_cpus)
# Make a dummy function that we can send to the workers in the pool
def print_num(num):
print 'The number is: ', num
return num
# Make the pool object
p = Pool(processes=num_cpus)
# Make some input to map out to the workers
nums_to_print = range(num_cpus)
# Here's where we farm out the work to the workers in the pool.
#results = p.map(print_num, nums_to_print)
results = p.map_async(print_num, nums_to_print)
print results
In [ ]:
# The asynchronous result object has some useful methods
print results.ready() # Return whether the results have been produced by all the workers
print results.successful() # Returns whether each of the calls was successful. If any of the workers raises an exception, this is false
# To actually retrieve the result from the async result object, use get
results = results.get()
print results
In [ ]:
# Shut down the pool
# First, you have to close (or terminate)
p.close()
In [ ]:
# Then you can join
p.join()
Pool has two main methods for using the workers: apply and map. They mirror the builtin functions the apply and map, except they use the processes in the pool rather than the main process.
There are synchronous and asynchronous versions of each. The only difference there is the synchronous calls are blocking (i.e. the process that created the pool will be blocked until ALL of the workers return) while the asynchronous calls aren't. The asynchronous calls (map_async or apply_async) return an AsynchronousResult object like the one we saw above. You can poll the state of this object and acquire the results using get() when they're ready.
In [ ]:
# Synchronous vs. Asynchronous
import time
# Make a dummy function that will "compute" for a while
def dummy(t):
time.sleep(t)
print "Done sleeping"
# Set up a pool
p = Pool(processes=3) # You don't HAVE to use the number of cores you have available
In [ ]:
# Synchronous
map_result = p.map(dummy, [5]*3)
print "I couldn't do anything until the workers were done, because this was a synchronous call"
In [ ]:
# Asynchronous
amap_result = p.map_async(dummy, [5]*3)
print "I on the other hand, can do whatever I want!"
print "Are my results ready?", amap_result.ready()
print "Okay, in the mean time; Here, have some random numbers"
print np.random.rand(10,10)
In [ ]:
print "Are my results ready now?", amap_result.ready()
print "If so, let's get them!"
if amap_result.ready():
results = amap_result.get()
print results
Notice that there was some weirdness in the printing here: i.e. the worker threads didn't actually cough up their print statements until after the get() was called. I can't tell you exactly what's going on here, but anyone who's ever played with threading, synchronization, and stdout before has seen weird thing happen. The moral of the story: don't rely on printing from workers to notify you of anything.
See simulationExample_multiprocessing.py
In [ ]:
import numpy as np
import time
from multiprocessing import Pool, cpu_count
import sys
sys.path.append('..')
from numpyVectorization.motion_gauss import simulateParticles_loop
In [ ]:
# Define a wrapper function so we can pass in a tuple of args
def wrapper_fun(args):
return simulateParticles_loop(*args)
In [ ]:
# Set up simulation/Pool parameters
num_particles = 100000
num_steps = 100
num_cpus = cpu_count()
# Determine how many particles to run on each worker
num_particles_per_core = int(num_particles / num_cpus)
args = [(num_particles_per_core, num_steps, False)] * num_cpus
In [ ]:
# Pool-itize
tic = time.time()
p = Pool(num_cpus)
result = p.map_async(wrapper_fun, args)
poolresult = result.get()
toc = time.time()
print "%.5f sec to compute result with %s cores" %((toc-tic), num_cpus)
In [ ]:
# Serial again, for comparison
tic = time.time()
simulateParticles_loop(num_particles, num_steps, False)
toc = time.time()
print "%.5f sec to compute result without Pool" %(toc-tic)